speedup switch_on_next#682
Conversation
📝 Walkthrough📝 WalkthroughWalkthroughThe pull request introduces significant changes to the Changes
Possibly related PRs
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (1)
src/rpp/rpp/operators/switch_on_next.hpp (1)
16-16: Remove unnecessary include directiveThe header
<array>is included but not used in this file. Unnecessary includes can increase compilation times and introduce unwanted dependencies.Apply this diff to remove the unused include:
-#include <array>
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
src/rpp/rpp/operators/concat.hpp(3 hunks)src/rpp/rpp/operators/switch_on_next.hpp(5 hunks)
🧰 Additional context used
📓 Learnings (1)
src/rpp/rpp/operators/concat.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#673
File: src/rpp/rpp/operators/concat.hpp:59-59
Timestamp: 2024-11-10T04:25:44.618Z
Learning: In the `concat_disposable` class in `src/rpp/rpp/operators/concat.hpp`, within the `drain` method, it's acceptable to call `get_observer()->on_completed()` even when `is_disposed()` returns true, because `refcounted.dispose()` could cause disposing of `this` object.
🔇 Additional comments (4)
src/rpp/rpp/operators/switch_on_next.hpp (1)
81-81: Verify the completion logic in 'on_completed'
The condition if (m_state->get_base_child_disposable().is_disposed()) checks if the base disposable is disposed before calling on_completed() on the observer. Please verify that this condition accurately reflects the intended behavior and that it doesn't lead to premature or missed completion notifications.
To confirm the correctness of this logic, you can search for all usages of on_completed in the codebase to see how it's handled elsewhere:
✅ Verification successful
Let me analyze the code further by checking the implementation of the switch_on_next operator.
Let me try a different approach to get the implementation details.
After analyzing the implementation, I can now provide a final response regarding the completion logic in switch_on_next operator.
The completion check in on_completed is correct and necessary
The condition if (m_state->get_base_child_disposable().is_disposed()) in the inner observer's on_completed() is a crucial part of the switch_on_next operator's completion logic. Here's why:
-
The operator manages two types of disposables:
- base_child_disposable: for the source observable of observables
- inner_child_disposable: for the currently active inner observable
-
The completion logic ensures that:
- When an inner observable completes, it only propagates completion if the base observable has already completed (base disposed)
- When the base observable completes, it only propagates completion if there's no active inner observable (inner disposed)
This prevents premature completion and ensures proper cleanup of resources in scenarios where:
- Multiple inner observables may complete
- The base observable completes while an inner observable is still active
- The last inner observable completes after the base observable
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Search for all usages of 'on_completed' to verify consistent behavior.
# Expected: Instances where 'on_completed' is called with appropriate disposal checks.
rg 'on_completed\(' -A 3
Length of output: 66953
Script:
#!/bin/bash
# Get the implementation of switch_on_next operator
ast-grep --pattern 'class switch_on_next_observer_strategy {
$$$
}'
Length of output: 71
Script:
#!/bin/bash
# Get the content of switch_on_next.hpp to understand the completion logic
cat src/rpp/rpp/operators/switch_on_next.hpp
Length of output: 7197
src/rpp/rpp/operators/concat.hpp (3)
15-15: Necessary inclusion of composite_disposable.hpp
Including <rpp/disposables/composite_disposable.hpp> is appropriate as rpp::composite_disposable is utilized within the file, specifically in managing child disposables.
38-38: Review inheritance change to base_disposable
The concat_disposable class now inherits from rpp::details::base_disposable instead of rpp::composite_disposable. This alteration impacts the disposal mechanism. Ensure that all disposal logic and resource management are correctly updated to reflect this change, and that no functionalities relying on the previous inheritance are adversely affected.
91-94: Ensure correct disposal in base_dispose_impl
The overridden base_dispose_impl method disposes of m_child_disposables by iterating over them and calling dispose() on each. Confirm that this implementation effectively releases all resources and aligns with the intended disposal behavior under the new inheritance structure.
| const auto inner = m_state->get_inner_child_disposable(); | ||
| inner->dispose(); | ||
| inner = rpp::composite_disposable_wrapper::make(); | ||
| std::forward<T>(v).subscribe(switch_on_next_inner_observer_strategy<TObserver>{m_state, *inner}); |
There was a problem hiding this comment.
Ensure thread safety when modifying shared state
Modifying m_inner_child_disposable without proper synchronization may lead to data races in a multithreaded environment. Since m_inner_child_disposable is protected by a mutex, updates should be performed within a locked scope to ensure thread safety.
Consider locking the mutex before modifying m_inner_child_disposable:
{
auto inner_lock = m_state->get_inner_child_disposable().unique_lock();
*inner_lock = rpp::composite_disposable_wrapper::make();
std::forward<T>(v).subscribe(switch_on_next_inner_observer_strategy<TObserver>{m_state, **inner_lock});
}Ensure that you're using the appropriate locking mechanism provided by value_with_mutex or pointer_under_lock.
Fix assignment to 'inner' declared as const
The variable inner is declared as const auto at line 110 but is reassigned at line 112, which is not allowed and will result in a compilation error.
Apply this diff to remove the const qualifier:
-const auto inner = m_state->get_inner_child_disposable();
+auto inner = m_state->get_inner_child_disposable();
inner->dispose();
inner = rpp::composite_disposable_wrapper::make();
std::forward<T>(v).subscribe(switch_on_next_inner_observer_strategy<TObserver>{m_state, *inner});📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const auto inner = m_state->get_inner_child_disposable(); | |
| inner->dispose(); | |
| inner = rpp::composite_disposable_wrapper::make(); | |
| std::forward<T>(v).subscribe(switch_on_next_inner_observer_strategy<TObserver>{m_state, *inner}); | |
| auto inner = m_state->get_inner_child_disposable(); | |
| inner->dispose(); | |
| inner = rpp::composite_disposable_wrapper::make(); | |
| std::forward<T>(v).subscribe(switch_on_next_inner_observer_strategy<TObserver>{m_state, *inner}); |
| m_state->get_base_child_disposable().dispose(); | ||
| if (m_state->get_inner_child_disposable()->is_disposed()) |
There was a problem hiding this comment.
Ensure proper disposal order in 'on_completed'
In the on_completed method, disposing of m_state->get_base_child_disposable() before checking if m_inner_child_disposable is disposed might lead to race conditions or unintended behavior. Ensure that disposing the base disposable doesn't interfere with the inner disposable's state.
Consider reviewing the disposal logic to guarantee that all disposables are properly managed and that completion is signaled correctly.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
src/rpp/rpp/operators/switch_on_next.hpp (1)
25-25: Add override specifier to base_dispose_implThe virtual method implementation should be explicitly marked with the override specifier for better code clarity and to catch potential signature mismatches at compile-time.
- void base_dispose_impl(interface_disposable::Mode) noexcept override + void base_dispose_impl(interface_disposable::Mode) noexcept final overrideAlso applies to: 42-46
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
src/rpp/rpp/operators/switch_on_next.hpp(6 hunks)
🔇 Additional comments (3)
src/rpp/rpp/operators/switch_on_next.hpp (3)
61-63: LGTM: Improved move semantics
The change to accept the disposable wrapper by rvalue reference and move it into the member variable is a good optimization that avoids unnecessary copies.
126-127: Verify disposal order in on_completed
The disposal order in on_completed needs verification to prevent race conditions.
Let's verify the disposal order implementation in similar reactive operators:
#!/bin/bash
# Search for similar disposal patterns in other operators
rg -U "on_completed.*dispose.*is_disposed" --type cpp110-116:
Thread safety concern in on_next implementation
The previous review comment about thread safety is still valid. The disposable modification needs proper synchronization.
Consider using RAII-style locking:
- auto new_inner = rpp::composite_disposable_wrapper::make();
- {
- auto inner = m_state->get_inner_child_disposable();
- inner->dispose();
- *inner = new_inner;
- }
- std::forward<T>(v).subscribe(switch_on_next_inner_observer_strategy<TObserver>{m_state, std::move(new_inner)});
+ auto new_inner = rpp::composite_disposable_wrapper::make();
+ {
+ auto inner_lock = m_state->get_inner_child_disposable().unique_lock();
+ inner_lock->dispose();
+ *inner_lock = new_inner;
+ std::forward<T>(v).subscribe(switch_on_next_inner_observer_strategy<TObserver>{m_state, std::move(new_inner)});
+ }
BENCHMARK RESULTS (AUTOGENERATED)
|
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 298.96 ns | 1.55 ns | 1.85 ns | 0.83 |
| Subscribe empty callbacks to empty observable via pipe operator | 298.83 ns | 1.54 ns | 1.85 ns | 0.83 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 689.85 ns | 0.31 ns | 0.31 ns | 1.00 |
| from array of 1 - create + subscribe + current_thread | 1032.11 ns | 3.42 ns | 3.42 ns | 1.00 |
| concat_as_source of just(1 immediate) create + subscribe | 2210.27 ns | 117.32 ns | 118.49 ns | 0.99 |
| defer from array of 1 - defer + create + subscribe + immediate | 722.75 ns | 0.31 ns | 0.31 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 2096.92 ns | 59.19 ns | 59.23 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 2950.12 ns | 32.42 ns | 32.40 ns | 1.00 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 30103.10 ns | 28303.52 ns | 27635.19 ns | 1.02 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 40801.68 ns | 50661.75 ns | 50572.00 ns | 1.00 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3532.40 ns | 133.63 ns | 140.57 ns | 0.95 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1091.45 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 858.31 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 992.74 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 897.73 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+first()+subscribe | 1227.06 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 904.85 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 1112.34 ns | 18.52 ns | 17.91 ns | 1.03 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 832.17 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 269.29 ns | 0.62 ns | 1.55 ns | 0.40 |
| current_thread scheduler create worker + schedule | 362.82 ns | 4.94 ns | 4.32 ns | 1.14 |
| current_thread scheduler create worker + schedule + recursive schedule | 845.85 ns | 60.82 ns | 60.48 ns | 1.01 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 835.55 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 893.71 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2373.93 ns | 155.48 ns | 139.77 ns | 1.11 |
| immediate_just+buffer(2)+subscribe | 1574.30 ns | 13.90 ns | 13.59 ns | 1.02 |
| immediate_just+window(2)+subscribe + subscsribe inner | 2398.84 ns | 1331.48 ns | 1359.80 ns | 0.98 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 833.07 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 832.10 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 2068.96 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3371.94 ns | 152.25 ns | 160.14 ns | 0.95 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3719.04 ns | 160.16 ns | 155.79 ns | 1.03 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 134.17 ns | 137.99 ns | 0.97 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3550.07 ns | 378.81 ns | 1254.32 ns | 0.30 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2100.80 ns | 211.05 ns | 210.73 ns | 1.00 |
| immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 3196.97 ns | 227.92 ns | 250.60 ns | 0.91 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 34.53 ns | 14.66 ns | 14.68 ns | 1.00 |
| subscribe 100 observers to publish_subject | 201547.83 ns | 17406.13 ns | 16449.17 ns | 1.06 |
| 100 on_next to 100 observers to publish_subject | 28020.83 ns | 17363.46 ns | 17198.60 ns | 1.01 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1352.89 ns | 13.28 ns | 12.67 ns | 1.05 |
| basic sample with immediate scheduler | 1511.98 ns | 5.55 ns | 5.24 ns | 1.06 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 910.24 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2094.91 ns | 1008.41 ns | 983.42 ns | 1.03 |
| create(on_error())+retry(1)+subscribe | 597.24 ns | 109.36 ns | 109.48 ns | 1.00 |
ci-macos
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 969.96 ns | 0.47 ns | 0.71 ns | 0.66 |
| Subscribe empty callbacks to empty observable via pipe operator | 970.00 ns | 0.47 ns | 0.70 ns | 0.67 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 1982.77 ns | 0.23 ns | 0.23 ns | 1.00 |
| from array of 1 - create + subscribe + current_thread | 2831.31 ns | 33.70 ns | 33.60 ns | 1.00 |
| concat_as_source of just(1 immediate) create + subscribe | 6583.36 ns | 356.80 ns | 315.88 ns | 1.13 |
| defer from array of 1 - defer + create + subscribe + immediate | 2106.08 ns | 0.29 ns | 0.23 ns | 1.23 |
| interval - interval + take(3) + subscribe + immediate | 5505.53 ns | 129.26 ns | 114.56 ns | 1.13 |
| interval - interval + take(3) + subscribe + current_thread | 6722.03 ns | 109.42 ns | 97.86 ns | 1.12 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 99219.36 ns | 90476.08 ns | 79561.79 ns | 1.14 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 99460.09 ns | 100005.40 ns | 87249.42 ns | 1.15 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 10578.93 ns | 447.97 ns | 359.45 ns | 1.25 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 10523.33 ns | 1.13 ns | 0.23 ns | 4.81 |
| immediate_just+filter(true)+subscribe | 2497.96 ns | 0.27 ns | 0.23 ns | 1.17 |
| immediate_just(1,2)+skip(1)+subscribe | 3346.84 ns | 0.26 ns | 0.23 ns | 1.10 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 2066.78 ns | 0.48 ns | 0.47 ns | 1.02 |
| immediate_just(1,2)+first()+subscribe | 3592.43 ns | 0.28 ns | 0.23 ns | 1.18 |
| immediate_just(1,2)+last()+subscribe | 2672.26 ns | 0.27 ns | 0.23 ns | 1.13 |
| immediate_just+take_last(1)+subscribe | 17343.16 ns | 1.82 ns | 0.23 ns | 7.78 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 2410.43 ns | 0.24 ns | 0.23 ns | 1.04 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 963.62 ns | 1.04 ns | 0.93 ns | 1.11 |
| current_thread scheduler create worker + schedule | 1201.11 ns | 39.95 ns | 34.14 ns | 1.17 |
| current_thread scheduler create worker + schedule + recursive schedule | 2304.23 ns | 216.06 ns | 202.92 ns | 1.06 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 2505.90 ns | 4.51 ns | 4.21 ns | 1.07 |
| immediate_just+scan(10, std::plus)+subscribe | 2446.38 ns | 0.48 ns | 0.47 ns | 1.03 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 5608.64 ns | 399.90 ns | 375.74 ns | 1.06 |
| immediate_just+buffer(2)+subscribe | 2660.78 ns | 71.23 ns | 64.18 ns | 1.11 |
| immediate_just+window(2)+subscribe + subscsribe inner | 49135.11 ns | 2555.66 ns | 2385.97 ns | 1.07 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 2098.85 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 2373.99 ns | 0.23 ns | 0.23 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 4961.58 ns | 4.90 ns | 4.90 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 8556.70 ns | 463.69 ns | 417.88 ns | 1.11 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 10322.71 ns | 495.67 ns | 407.91 ns | 1.22 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 550.43 ns | 448.18 ns | 1.23 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 8006.05 ns | 930.42 ns | 1881.86 ns | 0.49 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 5509.45 ns | 857.89 ns | 822.09 ns | 1.04 |
| immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 8339.62 ns | 727.86 ns | 674.38 ns | 1.08 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 85.00 ns | 50.02 ns | 49.39 ns | 1.01 |
| subscribe 100 observers to publish_subject | 2313147.00 ns | 231563.78 ns | 40905.05 ns | 5.66 |
| 100 on_next to 100 observers to publish_subject | 65960.30 ns | 33654.00 ns | 19300.43 ns | 1.74 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 3548.29 ns | 84.82 ns | 69.43 ns | 1.22 |
| basic sample with immediate scheduler | 3417.56 ns | 23.18 ns | 18.44 ns | 1.26 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 2493.45 ns | 0.25 ns | 0.23 ns | 1.06 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 6791.77 ns | 4166.03 ns | 4118.37 ns | 1.01 |
| create(on_error())+retry(1)+subscribe | 1890.32 ns | 288.51 ns | 278.47 ns | 1.04 |
ci-ubuntu-clang
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 275.90 ns | 1.54 ns | 0.63 ns | 2.43 |
| Subscribe empty callbacks to empty observable via pipe operator | 272.71 ns | 1.54 ns | 0.63 ns | 2.43 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 565.40 ns | 0.31 ns | 0.31 ns | 1.00 |
| from array of 1 - create + subscribe + current_thread | 791.16 ns | 4.02 ns | 4.01 ns | 1.00 |
| concat_as_source of just(1 immediate) create + subscribe | 2361.77 ns | 136.38 ns | 128.61 ns | 1.06 |
| defer from array of 1 - defer + create + subscribe + immediate | 792.51 ns | 0.31 ns | 0.31 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 2222.49 ns | 58.31 ns | 58.30 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 3149.10 ns | 30.86 ns | 30.88 ns | 1.00 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 28736.43 ns | 27994.22 ns | 29074.54 ns | 0.96 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 36378.16 ns | 37941.45 ns | 37825.45 ns | 1.00 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3696.38 ns | 148.80 ns | 148.28 ns | 1.00 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1152.34 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 842.89 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 1086.80 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 887.66 ns | 0.62 ns | 0.31 ns | 2.00 |
| immediate_just(1,2)+first()+subscribe | 1365.22 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 1010.44 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 1204.05 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 868.54 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 277.65 ns | 0.63 ns | 1.54 ns | 0.41 |
| current_thread scheduler create worker + schedule | 394.03 ns | 4.02 ns | 4.01 ns | 1.00 |
| current_thread scheduler create worker + schedule + recursive schedule | 842.86 ns | 55.93 ns | 56.03 ns | 1.00 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 847.40 ns | 0.31 ns | 0.31 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 973.69 ns | 0.31 ns | 0.62 ns | 0.50 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 2256.85 ns | 141.02 ns | 140.60 ns | 1.00 |
| immediate_just+buffer(2)+subscribe | 1530.71 ns | 13.89 ns | 14.19 ns | 0.98 |
| immediate_just+window(2)+subscribe + subscsribe inner | 2423.45 ns | 924.29 ns | 918.06 ns | 1.01 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 842.21 ns | - | - | 0.00 |
| immediate_just+take_while(true)+subscribe | 844.79 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 2025.96 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3251.41 ns | 155.89 ns | 153.10 ns | 1.02 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3755.06 ns | 137.49 ns | 137.52 ns | 1.00 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 140.99 ns | 143.00 ns | 0.99 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3360.21 ns | 375.24 ns | 830.65 ns | 0.45 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 2203.38 ns | 200.77 ns | 204.17 ns | 0.98 |
| immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 3245.70 ns | 222.86 ns | 222.97 ns | 1.00 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 54.02 ns | 18.40 ns | 17.91 ns | 1.03 |
| subscribe 100 observers to publish_subject | 230564.00 ns | 18029.91 ns | 16076.98 ns | 1.12 |
| 100 on_next to 100 observers to publish_subject | 38792.70 ns | 27547.66 ns | 23499.42 ns | 1.17 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1298.93 ns | 11.40 ns | 11.42 ns | 1.00 |
| basic sample with immediate scheduler | 1306.07 ns | 5.24 ns | 6.18 ns | 0.85 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 1115.10 ns | 0.39 ns | 0.31 ns | 1.27 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2209.24 ns | 1275.40 ns | 1165.87 ns | 1.09 |
| create(on_error())+retry(1)+subscribe | 1072.41 ns | 161.54 ns | 140.12 ns | 1.15 |
ci-windows
General
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| Subscribe empty callbacks to empty observable | 566.36 ns | 1.85 ns | 1.85 ns | 1.00 |
| Subscribe empty callbacks to empty observable via pipe operator | 591.72 ns | 1.85 ns | 1.85 ns | 1.00 |
Sources
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| from array of 1 - create + subscribe + immediate | 1159.53 ns | 5.24 ns | 5.55 ns | 0.95 |
| from array of 1 - create + subscribe + current_thread | 1436.98 ns | 15.75 ns | 15.75 ns | 1.00 |
| concat_as_source of just(1 immediate) create + subscribe | 3969.72 ns | 175.60 ns | 174.46 ns | 1.01 |
| defer from array of 1 - defer + create + subscribe + immediate | 1198.25 ns | 5.55 ns | 5.55 ns | 1.00 |
| interval - interval + take(3) + subscribe + immediate | 3032.08 ns | 141.07 ns | 140.85 ns | 1.00 |
| interval - interval + take(3) + subscribe + current_thread | 3410.33 ns | 60.63 ns | 60.24 ns | 1.01 |
| from array of 1 - create + as_blocking + subscribe + new_thread | 120671.43 ns | 112555.56 ns | 114088.89 ns | 0.99 |
| from array of 1000 - create + as_blocking + subscribe + new_thread | 138271.43 ns | 130377.78 ns | 130722.22 ns | 1.00 |
| concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 5350.00 ns | 197.27 ns | 200.40 ns | 0.98 |
Filtering Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take(1)+subscribe | 1814.04 ns | 19.74 ns | 19.73 ns | 1.00 |
| immediate_just+filter(true)+subscribe | 1322.58 ns | 18.80 ns | 18.81 ns | 1.00 |
| immediate_just(1,2)+skip(1)+subscribe | 1713.56 ns | 18.52 ns | 18.51 ns | 1.00 |
| immediate_just(1,1,2)+distinct_until_changed()+subscribe | 1375.97 ns | 23.46 ns | 23.45 ns | 1.00 |
| immediate_just(1,2)+first()+subscribe | 2371.46 ns | 17.28 ns | 17.28 ns | 1.00 |
| immediate_just(1,2)+last()+subscribe | 1730.73 ns | 18.52 ns | 18.51 ns | 1.00 |
| immediate_just+take_last(1)+subscribe | 2027.80 ns | 64.27 ns | 65.03 ns | 0.99 |
| immediate_just(1,2,3)+element_at(1)+subscribe | 1342.37 ns | 21.91 ns | 21.91 ns | 1.00 |
Schedulers
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate scheduler create worker + schedule | 711.79 ns | 4.32 ns | 4.01 ns | 1.08 |
| current_thread scheduler create worker + schedule | 656.42 ns | 11.17 ns | 11.61 ns | 0.96 |
| current_thread scheduler create worker + schedule + recursive schedule | 1372.41 ns | 103.88 ns | 104.64 ns | 0.99 |
Transforming Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+map(v*2)+subscribe | 1323.88 ns | 18.82 ns | 18.81 ns | 1.00 |
| immediate_just+scan(10, std::plus)+subscribe | 1742.05 ns | 21.28 ns | 20.99 ns | 1.01 |
| immediate_just+flat_map(immediate_just(v*2))+subscribe | 3495.77 ns | 183.98 ns | 179.90 ns | 1.02 |
| immediate_just+buffer(2)+subscribe | 2324.57 ns | 63.10 ns | 65.59 ns | 0.96 |
| immediate_just+window(2)+subscribe + subscsribe inner | 4039.62 ns | 1293.28 ns | 1342.91 ns | 0.96 |
Conditional Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+take_while(false)+subscribe | 1319.24 ns | 17.59 ns | 17.57 ns | 1.00 |
| immediate_just+take_while(true)+subscribe | 1320.50 ns | 18.82 ns | 18.81 ns | 1.00 |
Utility Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(1)+subscribe_on(immediate)+subscribe | 3262.32 ns | 11.10 ns | 11.10 ns | 1.00 |
Combining Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 5087.68 ns | 204.52 ns | 196.94 ns | 1.04 |
| immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 5464.93 ns | 192.48 ns | 182.58 ns | 1.05 |
| immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 186.56 ns | 207.02 ns | 0.90 |
| immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 5653.93 ns | 437.20 ns | 968.84 ns | 0.45 |
| immediate_just(1) + zip(immediate_just(2)) + subscribe | 3584.95 ns | 520.78 ns | 522.18 ns | 1.00 |
| immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 5197.12 ns | 321.43 ns | 333.45 ns | 0.96 |
Subjects
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| publish_subject with 1 observer - on_next | 36.76 ns | 20.60 ns | 20.11 ns | 1.02 |
| subscribe 100 observers to publish_subject | 262150.00 ns | 27500.00 ns | 29612.82 ns | 0.93 |
| 100 on_next to 100 observers to publish_subject | 51910.00 ns | 38727.59 ns | 35737.93 ns | 1.08 |
Scenarios
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| basic sample | 1914.53 ns | 95.40 ns | 96.09 ns | 0.99 |
| basic sample with immediate scheduler | 1923.50 ns | 68.51 ns | 68.33 ns | 1.00 |
Aggregating Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| immediate_just+reduce(10, std::plus)+subscribe | 1469.02 ns | 19.42 ns | 19.42 ns | 1.00 |
Error Handling Operators
| name | rxcpp | rpp | prev rpp | ratio |
|---|---|---|---|---|
| create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 1935.66 ns | 359.92 ns | 358.78 ns | 1.00 |
| create(on_error())+retry(1)+subscribe | 1624.76 ns | 139.03 ns | 139.24 ns | 1.00 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## v2 #682 +/- ##
==========================================
- Coverage 98.64% 98.60% -0.04%
==========================================
Files 155 155
Lines 9764 9775 +11
==========================================
+ Hits 9632 9639 +7
- Misses 132 136 +4 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
src/rpp/rpp/operators/switch_on_next.hpp (1)
Line range hint
25-52: LGTM! Consider adding documentation for the new members.The refactoring from
refcount_disposabletobase_disposablewith separate base and inner disposables improves the resource management design. The implementation correctly handles thread safety with mutex protection.Consider adding documentation comments for the new member variables and methods to explain their roles:
/// Base disposable for managing the main subscription rpp::composite_disposable m_base_child_disposable{}; /// Disposable for managing the current inner subscription, protected by mutex rpp::utils::value_with_mutex<rpp::composite_disposable_wrapper> m_inner_child_disposable{};src/tests/rpp/test_switch_on_next.cpp (1)
168-202: Good test coverage, but could be more descriptive and comprehensiveThe test cases effectively verify completion behavior in various scenarios. However, consider these improvements:
- Make test descriptions more descriptive:
- SUBCASE("switch_on_next completes right") + SUBCASE("switch_on_next handles completion signals correctly") - SUBCASE("on_completed from base") + SUBCASE("completes when base observable completes with no active inner observable") - SUBCASE("on_completed from inner + then from base") + SUBCASE("completes when inner observable completes followed by base observable completion") - SUBCASE("on_completed from base + then from inner") + SUBCASE("completes when base completes followed by active inner observable completion")
- Add comments explaining the expected behavior:
SUBCASE("switch_on_next handles completion signals correctly") { mock_observer<int> mock{}; trompeloeil::sequence s{}; rpp::subjects::publish_subject<rpp::dynamic_observable<int>> subj{}; subj.get_observable() | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock); + // Case 1: Base observable completes with no active inner observable + // Expected: Immediate completion SUBCASE("completes when base observable completes with no active inner observable") { REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s); subj.get_observer().on_completed(); } + // Case 2: Inner observable completes, then base completes + // Expected: Completion only after base completes SUBCASE("completes when inner observable completes followed by base observable completion") { subj.get_observer().on_next(rpp::source::empty<int>()); REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s); subj.get_observer().on_completed(); } + // Case 3: Base completes, then active inner observable completes + // Expected: Completion only after inner observable completes SUBCASE("completes when base completes followed by active inner observable completion") { subj.get_observer().on_next(rpp::source::empty<int>()); subj.get_observer().on_next(rpp::source::never<int>()); rpp::subjects::publish_subject<int> inner{}; subj.get_observer().on_next(inner.get_observable()); subj.get_observer().on_completed(); REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s); inner.get_observer().on_completed(); } }
- Consider adding these edge cases:
- Multiple inner observables completing before base completion
- Error scenarios during completion
- Completion with buffered values
Would you like me to help implement the additional test cases?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
src/rpp/rpp/operators/switch_on_next.hpp(6 hunks)src/tests/rpp/test_switch_on_next.cpp(2 hunks)
🔇 Additional comments (4)
src/rpp/rpp/operators/switch_on_next.hpp (3)
61-63: LGTM! Good use of move semantics.
The change to take composite_disposable_wrapper by move improves efficiency and makes ownership transfer explicit.
110-119: LGTM! Thread-safe implementation with proper RAII.
The new implementation correctly handles thread safety with RAII-style locking and proper disposal state checks.
129-130:
Review disposal order in on_completed.
The current implementation disposes of the base disposable before checking the inner disposable's state. This could lead to a race condition where:
- Base disposable is disposed
- A new subscription arrives
- Inner disposable state is checked
- Completion signal is sent incorrectly
Consider reversing the order:
-m_state->get_base_child_disposable().dispose();
-if (m_state->get_inner_child_disposable()->is_disposed())
+auto inner_disposed = m_state->get_inner_child_disposable()->is_disposed();
+m_state->get_base_child_disposable().dispose();
+if (inner_disposed)
m_state->get_observer()->on_completed();Likely invalid or redundant comment.
src/tests/rpp/test_switch_on_next.cpp (1)
24-24: LGTM: Header inclusion for mocking framework
The inclusion of rpp_trompeloil.hpp is appropriate for the new test cases that use sequence verification.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
src/tests/rpp/test_switch_on_next.cpp (1)
144-175: LGTM: Thorough completion behavior testingThe new test case comprehensively verifies completion behavior in three key scenarios:
- Base observable completion
- Inner observable completion followed by base completion
- Base completion followed by inner observable completion
Consider adding these additional scenarios to make the test coverage even more robust:
- Multiple inner observables completing before base completion
- Error scenarios during completion
- Race conditions during completion
Example:
SUBCASE("multiple inner observables complete before base") { auto inner1 = rpp::subjects::publish_subject<int>{}; auto inner2 = rpp::subjects::publish_subject<int>{}; subj.get_observer().on_next(inner1.get_observable()); subj.get_observer().on_next(inner2.get_observable()); inner1.get_observer().on_completed(); inner2.get_observer().on_completed(); REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s); subj.get_observer().on_completed(); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
src/tests/rpp/test_switch_on_next.cpp(3 hunks)
🔇 Additional comments (2)
src/tests/rpp/test_switch_on_next.cpp (2)
23-23: LGTM: Clean mock setup with sequence verification
The changes improve the test setup by using a dedicated mock header and sequence verification for precise ordering of expectations.
Also applies to: 27-29
33-38: LGTM: Comprehensive test coverage with strict ordering
The modified test cases thoroughly verify the switch_on_next operator's behavior across different scenarios:
- Normal completion
- Error handling
- Empty observables
- Never-completing observables
The sequence verification ensures the emissions occur in the correct order.
Also applies to: 47-51, 60-65, 74-78, 87-91
|



#681
Summary by CodeRabbit
New Features
concatandswitch_on_nextoperators with improved disposal mechanisms.switch_on_nextoperator.Bug Fixes
switch_on_nextoperator to ensure proper handling of disposables.Tests
switch_on_nextoperator to validate completion behavior under various scenarios.